// This file is part of Substrate.

// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Module implementing the logic for verifying and importing AuRa blocks.

use crate::{
	AuthorityId, find_pre_digest, slot_author, aura_err, Error, AuraSlotCompatible, SlotDuration,
	register_aura_inherent_data_provider, authorities,
};
use std::{
	sync::Arc, time::Duration, thread, marker::PhantomData, hash::Hash, fmt::Debug,
	collections::HashMap,
};
use log::{debug, info, trace};
use prometheus_endpoint::Registry;
use codec::{Encode, Decode, Codec};
use sp_consensus::{
	BlockImport, CanAuthorWith, ForkChoiceStrategy, BlockImportParams,
	BlockOrigin, Error as ConsensusError, BlockCheckParams, ImportResult,
	import_queue::{
		Verifier, BasicQueue, DefaultImportQueue, BoxJustificationImport,
	},
};
use sc_client_api::{backend::AuxStore, BlockOf};
use sp_blockchain::{well_known_cache_keys::{self, Id as CacheKeyId}, ProvideCache, HeaderBackend};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_runtime::{generic::{BlockId, OpaqueDigestItemId}, Justification};
use sp_runtime::traits::{Block as BlockT, Header, DigestItemFor, Zero};
use sp_api::ProvideRuntimeApi;
use sp_core::crypto::Pair;
use sp_inherents::{InherentDataProviders, InherentData};
use sp_timestamp::InherentError as TIError;
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_INFO};
use sc_consensus_slots::{CheckedHeader, SlotCompatible, check_equivocation};
use sp_consensus_slots::Slot;
use sp_api::ApiExt;
use sp_consensus_aura::{
	digests::CompatibleDigestItem, AuraApi, inherents::AuraInherentData,
	ConsensusLog, AURA_ENGINE_ID,
};

/// check a header has been signed by the right key. If the slot is too far in the future, an error
/// will be returned. If it's successful, returns the pre-header and the digest item
/// containing the seal.
///
/// This digest item will always return `Some` when used with `as_aura_seal`.
fn check_header<C, B: BlockT, P: Pair>(
	client: &C,
	slot_now: Slot,
	mut header: B::Header,
	hash: B::Hash,
	authorities: &[AuthorityId<P>],
	check_for_equivocation: CheckForEquivocation,
) -> Result<CheckedHeader<B::Header, (Slot, DigestItemFor<B>)>, Error<B>> where
	DigestItemFor<B>: CompatibleDigestItem<P::Signature>,
	P::Signature: Codec,
	C: sc_client_api::backend::AuxStore,
	P::Public: Encode + Decode + PartialEq + Clone,
{
	let seal = match header.digest_mut().pop() {
		Some(x) => x,
		None => return Err(Error::HeaderUnsealed(hash)),
	};

	let sig = seal.as_aura_seal().ok_or_else(|| {
		aura_err(Error::HeaderBadSeal(hash))
	})?;

	let slot = find_pre_digest::<B, P::Signature>(&header)?;

	if slot > slot_now {
		header.digest_mut().push(seal);
		Ok(CheckedHeader::Deferred(header, slot))
	} else {
		// check the signature is valid under the expected authority and
		// chain state.
		let expected_author = match slot_author::<P>(slot, &authorities) {
			None => return Err(Error::SlotAuthorNotFound),
			Some(author) => author,
		};

		let pre_hash = header.hash();

		if P::verify(&sig, pre_hash.as_ref(), expected_author) {
			if check_for_equivocation.check_for_equivocation() {
				if let Some(equivocation_proof) = check_equivocation(
					client,
					slot_now,
					slot,
					&header,
					expected_author,
				).map_err(Error::Client)? {
					info!(
						target: "aura",
						"Slot author is equivocating at slot {} with headers {:?} and {:?}",
						slot,
						equivocation_proof.first_header.hash(),
						equivocation_proof.second_header.hash(),
					);
				}
			}

			Ok(CheckedHeader::Checked(header, (slot, seal)))
		} else {
			Err(Error::BadSignature(hash))
		}
	}
}

/// A verifier for Aura blocks.
pub struct AuraVerifier<C, P, CAW> {
	client: Arc<C>,
	phantom: PhantomData<P>,
	inherent_data_providers: InherentDataProviders,
	can_author_with: CAW,
	check_for_equivocation: CheckForEquivocation,
	telemetry: Option<TelemetryHandle>,
}

impl<C, P, CAW> AuraVerifier<C, P, CAW> {
	pub(crate) fn new(
		client: Arc<C>,
		inherent_data_providers: InherentDataProviders,
		can_author_with: CAW,
		check_for_equivocation: CheckForEquivocation,
		telemetry: Option<TelemetryHandle>,
	) -> Self {
		Self {
			client,
			inherent_data_providers,
			can_author_with,
			check_for_equivocation,
			telemetry,
			phantom: PhantomData,
		}
	}
}

impl<C, P, CAW> AuraVerifier<C, P, CAW> where
	P: Send + Sync + 'static,
	CAW: Send + Sync + 'static,
{
	fn check_inherents<B: BlockT>(
		&self,
		block: B,
		block_id: BlockId<B>,
		inherent_data: InherentData,
		timestamp_now: u64,
	) -> Result<(), Error<B>> where
		C: ProvideRuntimeApi<B>, C::Api: BlockBuilderApi<B>,
		CAW: CanAuthorWith<B>,
	{
		const MAX_TIMESTAMP_DRIFT_SECS: u64 = 60;

		if let Err(e) = self.can_author_with.can_author_with(&block_id) {
			debug!(
				target: "aura",
				"Skipping `check_inherents` as authoring version is not compatible: {}",
				e,
			);

			return Ok(())
		}

		let inherent_res = self.client.runtime_api().check_inherents(
			&block_id,
			block,
			inherent_data,
		).map_err(|e| Error::Client(e.into()))?;

		if !inherent_res.ok() {
			inherent_res
				.into_errors()
				.try_for_each(|(i, e)| match TIError::try_from(&i, &e) {
					Some(TIError::ValidAtTimestamp(timestamp)) => {
						// halt import until timestamp is valid.
						// reject when too far ahead.
						if timestamp > timestamp_now + MAX_TIMESTAMP_DRIFT_SECS {
							return Err(Error::TooFarInFuture);
						}

						let diff = timestamp.saturating_sub(timestamp_now);
						info!(
							target: "aura",
							"halting for block {} seconds in the future",
							diff
						);
						telemetry!(
							self.telemetry;
							CONSENSUS_INFO;
							"aura.halting_for_future_block";
							"diff" => ?diff
						);
						thread::sleep(Duration::from_secs(diff));
						Ok(())
					},
					Some(TIError::Other(e)) => Err(Error::Runtime(e.into())),
					None => Err(Error::DataProvider(
						self.inherent_data_providers.error_to_string(&i, &e)
					)),
				})
		} else {
			Ok(())
		}
	}
}

impl<B: BlockT, C, P, CAW> Verifier<B> for AuraVerifier<C, P, CAW> where
	C: ProvideRuntimeApi<B> +
		Send +
		Sync +
		sc_client_api::backend::AuxStore +
		ProvideCache<B> +
		BlockOf,
	C::Api: BlockBuilderApi<B> + AuraApi<B, AuthorityId<P>> + ApiExt<B>,
	DigestItemFor<B>: CompatibleDigestItem<P::Signature>,
	P: Pair + Send + Sync + 'static,
	P::Public: Send + Sync + Hash + Eq + Clone + Decode + Encode + Debug + 'static,
	P::Signature: Encode + Decode,
	CAW: CanAuthorWith<B> + Send + Sync + 'static,
{
	fn verify(
		&mut self,
		origin: BlockOrigin,
		header: B::Header,
		justification: Option<Justification>,
		mut body: Option<Vec<B::Extrinsic>>,
	) -> Result<(BlockImportParams<B, ()>, Option<Vec<(CacheKeyId, Vec<u8>)>>), String> {
		let mut inherent_data = self.inherent_data_providers
			.create_inherent_data()
			.map_err(|e| e.into_string())?;
		let (timestamp_now, slot_now, _) = AuraSlotCompatible.extract_timestamp_and_slot(&inherent_data)
			.map_err(|e| format!("Could not extract timestamp and slot: {:?}", e))?;
		let hash = header.hash();
		let parent_hash = *header.parent_hash();
		let authorities = authorities(self.client.as_ref(), &BlockId::Hash(parent_hash))
			.map_err(|e| format!("Could not fetch authorities at {:?}: {:?}", parent_hash, e))?;

		// we add one to allow for some small drift.
		// FIXME #1019 in the future, alter this queue to allow deferring of
		// headers
		let checked_header = check_header::<C, B, P>(
			&self.client,
			slot_now + 1,
			header,
			hash,
			&authorities[..],
			self.check_for_equivocation,
		).map_err(|e| e.to_string())?;
		match checked_header {
			CheckedHeader::Checked(pre_header, (slot, seal)) => {
				// if the body is passed through, we need to use the runtime
				// to check that the internally-set timestamp in the inherents
				// actually matches the slot set in the seal.
				if let Some(inner_body) = body.take() {
					inherent_data.aura_replace_inherent_data(slot);
					let block = B::new(pre_header.clone(), inner_body);

					// skip the inherents verification if the runtime API is old.
					if self.client
						.runtime_api()
						.has_api_with::<dyn BlockBuilderApi<B>, _>(
							&BlockId::Hash(parent_hash),
							|v| v >= 2,
						)
						.map_err(|e| format!("{:?}", e))?
					{
						self.check_inherents(
							block.clone(),
							BlockId::Hash(parent_hash),
							inherent_data,
							timestamp_now,
						).map_err(|e| e.to_string())?;
					}

					let (_, inner_body) = block.deconstruct();
					body = Some(inner_body);
				}

				trace!(target: "aura", "Checked {:?}; importing.", pre_header);
				telemetry!(
					self.telemetry;
					CONSENSUS_TRACE;
					"aura.checked_and_importing";
					"pre_header" => ?pre_header,
				);

				// Look for an authorities-change log.
				let maybe_keys = pre_header.digest()
					.logs()
					.iter()
					.filter_map(|l| l.try_to::<ConsensusLog<AuthorityId<P>>>(
						OpaqueDigestItemId::Consensus(&AURA_ENGINE_ID)
					))
					.find_map(|l| match l {
						ConsensusLog::AuthoritiesChange(a) => Some(
							vec![(well_known_cache_keys::AUTHORITIES, a.encode())]
						),
						_ => None,
					});

				let mut import_block = BlockImportParams::new(origin, pre_header);
				import_block.post_digests.push(seal);
				import_block.body = body;
				import_block.justification = justification;
				import_block.fork_choice = Some(ForkChoiceStrategy::LongestChain);
				import_block.post_hash = Some(hash);

				Ok((import_block, maybe_keys))
			}
			CheckedHeader::Deferred(a, b) => {
				debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
				telemetry!(
					self.telemetry;
					CONSENSUS_DEBUG;
					"aura.header_too_far_in_future";
					"hash" => ?hash,
					"a" => ?a,
					"b" => ?b,
				);
				Err(format!("Header {:?} rejected: too far in the future", hash))
			}
		}
	}
}

fn initialize_authorities_cache<A, B, C>(client: &C) -> Result<(), ConsensusError> where
	A: Codec + Debug,
	B: BlockT,
	C: ProvideRuntimeApi<B> + BlockOf + ProvideCache<B>,
	C::Api: AuraApi<B, A>,
{
	// no cache => no initialization
	let cache = match client.cache() {
		Some(cache) => cache,
		None => return Ok(()),
	};

	// check if we already have initialized the cache
	let map_err = |error| sp_consensus::Error::from(sp_consensus::Error::ClientImport(
		format!(
			"Error initializing authorities cache: {}",
			error,
		)));

	let genesis_id = BlockId::Number(Zero::zero());
	let genesis_authorities: Option<Vec<A>> = cache
		.get_at(&well_known_cache_keys::AUTHORITIES, &genesis_id)
		.unwrap_or(None)
		.and_then(|(_, _, v)| Decode::decode(&mut &v[..]).ok());
	if genesis_authorities.is_some() {
		return Ok(());
	}

	let genesis_authorities = authorities(client, &genesis_id)?;
	cache.initialize(&well_known_cache_keys::AUTHORITIES, genesis_authorities.encode())
		.map_err(map_err)?;

	Ok(())
}

/// A block-import handler for Aura.
pub struct AuraBlockImport<Block: BlockT, C, I: BlockImport<Block>, P> {
	inner: I,
	client: Arc<C>,
	_phantom: PhantomData<(Block, P)>,
}

impl<Block: BlockT, C, I: Clone + BlockImport<Block>, P> Clone for AuraBlockImport<Block, C, I, P> {
	fn clone(&self) -> Self {
		AuraBlockImport {
			inner: self.inner.clone(),
			client: self.client.clone(),
			_phantom: PhantomData,
		}
	}
}

impl<Block: BlockT, C, I: BlockImport<Block>, P> AuraBlockImport<Block, C, I, P> {
	/// New aura block import.
	pub fn new(
		inner: I,
		client: Arc<C>,
	) -> Self {
		Self {
			inner,
			client,
			_phantom: PhantomData,
		}
	}
}

impl<Block: BlockT, C, I, P> BlockImport<Block> for AuraBlockImport<Block, C, I, P> where
	I: BlockImport<Block, Transaction = sp_api::TransactionFor<C, Block>> + Send + Sync,
	I::Error: Into<ConsensusError>,
	C: HeaderBackend<Block> + ProvideRuntimeApi<Block>,
	P: Pair + Send + Sync + 'static,
	P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode,
	P::Signature: Encode + Decode,
{
	type Error = ConsensusError;
	type Transaction = sp_api::TransactionFor<C, Block>;

	fn check_block(
		&mut self,
		block: BlockCheckParams<Block>,
	) -> Result<ImportResult, Self::Error> {
		self.inner.check_block(block).map_err(Into::into)
	}

	fn import_block(
		&mut self,
		block: BlockImportParams<Block, Self::Transaction>,
		new_cache: HashMap<CacheKeyId, Vec<u8>>,
	) -> Result<ImportResult, Self::Error> {
		let hash = block.post_hash();
		let slot = find_pre_digest::<Block, P::Signature>(&block.header)
			.expect("valid Aura headers must contain a predigest; \
					 header has been already verified; qed");

		let parent_hash = *block.header.parent_hash();
		let parent_header = self.client.header(BlockId::Hash(parent_hash))
			.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
			.ok_or_else(|| ConsensusError::ChainLookup(aura_err(
				Error::<Block>::ParentUnavailable(parent_hash, hash)
			).into()))?;

		let parent_slot = find_pre_digest::<Block, P::Signature>(&parent_header)
			.expect("valid Aura headers contain a pre-digest; \
					parent header has already been verified; qed");

		// make sure that slot number is strictly increasing
		if slot <= parent_slot {
			return Err(
				ConsensusError::ClientImport(aura_err(
					Error::<Block>::SlotMustIncrease(parent_slot, slot)
				).into())
			);
		}

		self.inner.import_block(block, new_cache).map_err(Into::into)
	}
}

/// Should we check for equivocation of a block author?
#[derive(Debug, Clone, Copy)]
pub enum CheckForEquivocation {
	/// Yes, check for equivocation.
	///
	/// This is the default setting for this.
	Yes,
	/// No, don't check for equivocation.
	No,
}

impl CheckForEquivocation {
	/// Should we check for equivocation?
	fn check_for_equivocation(self) -> bool {
		matches!(self, Self::Yes)
	}
}

impl Default for CheckForEquivocation {
	fn default() -> Self {
		Self::Yes
	}
}

/// Parameters of [`import_queue`].
pub struct ImportQueueParams<'a, Block, I, C, S, CAW> {
	/// The block import to use.
	pub block_import: I,
	/// The justification import.
	pub justification_import: Option<BoxJustificationImport<Block>>,
	/// The client to interact with the chain.
	pub client: Arc<C>,
	/// The inherent data provider, to create the inherent data.
	pub inherent_data_providers: InherentDataProviders,
	/// The spawner to spawn background tasks.
	pub spawner: &'a S,
	/// The prometheus registry.
	pub registry: Option<&'a Registry>,
	/// Can we author with the current node?
	pub can_author_with: CAW,
	/// Should we check for equivocation?
	pub check_for_equivocation: CheckForEquivocation,
	/// The duration of one slot.
	pub slot_duration: SlotDuration,
	/// Telemetry instance used to report telemetry metrics.
	pub telemetry: Option<TelemetryHandle>,
}

/// Start an import queue for the Aura consensus algorithm.
pub fn import_queue<'a, P, Block, I, C, S, CAW>(
	ImportQueueParams {
		block_import,
		justification_import,
		client,
		inherent_data_providers,
		spawner,
		registry,
		can_author_with,
		check_for_equivocation,
		slot_duration,
		telemetry,
	}: ImportQueueParams<'a, Block, I, C, S, CAW>
) -> Result<DefaultImportQueue<Block, C>, sp_consensus::Error> where
	Block: BlockT,
	C::Api: BlockBuilderApi<Block> + AuraApi<Block, AuthorityId<P>> + ApiExt<Block>,
	C: 'static
		+ ProvideRuntimeApi<Block>
		+ BlockOf
		+ ProvideCache<Block>
		+ Send
		+ Sync
		+ AuxStore
		+ HeaderBackend<Block>,
	I: BlockImport<Block, Error=ConsensusError, Transaction = sp_api::TransactionFor<C, Block>>
		+ Send
		+ Sync
		+ 'static,
	DigestItemFor<Block>: CompatibleDigestItem<P::Signature>,
	P: Pair + Send + Sync + 'static,
	P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode,
	P::Signature: Encode + Decode,
	S: sp_core::traits::SpawnEssentialNamed,
	CAW: CanAuthorWith<Block> + Send + Sync + 'static,
{
	register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?;
	initialize_authorities_cache(&*client)?;

	let verifier = AuraVerifier::<_, P, _>::new(
		client,
		inherent_data_providers,
		can_author_with,
		check_for_equivocation,
		telemetry,
	);

	Ok(BasicQueue::new(
		verifier,
		Box::new(block_import),
		justification_import,
		spawner,
		registry,
	))
}
